#include "execution/executors/sort_executor.h"

namespace bustub {

SortExecutor::SortExecutor(ExecutorContext *exec_ctx, const SortPlanNode *plan,
                           std::unique_ptr<AbstractExecutor> &&child_executor)
    : AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}

void SortExecutor::Init() {
  res_.clear();
  child_executor_->Init();
  cursor_ = 0;
  Tuple tuple;
  RID rid;
  while (child_executor_->Next(&tuple, &rid)) {
    res_.emplace_back(tuple);
  }
  auto order_bys = plan_->GetOrderBy();
  auto schema = plan_->OutputSchema();
  std::sort(res_.begin(), res_.end(), [order_bys, schema](const Tuple &t1, const Tuple &t2) {
    for (const auto &order_by : order_bys) {
      if (order_by.second->Evaluate(&t1, schema).CompareNotEquals(order_by.second->Evaluate(&t2, schema)) ==
          CmpBool::CmpTrue) {
        if (order_by.first == OrderByType::DESC) {
          return order_by.second->Evaluate(&t1, schema).CompareGreaterThan(order_by.second->Evaluate(&t2, schema)) ==
                 CmpBool::CmpTrue;
        }
        return order_by.second->Evaluate(&t1, schema).CompareLessThan(order_by.second->Evaluate(&t2, schema)) ==
               CmpBool::CmpTrue;
      }
    }
    return true;
  });
}

auto SortExecutor::Next(Tuple *tuple, RID *rid) -> bool {
  if (cursor_ >= res_.size()) {
    return false;
  }
  *tuple = res_[cursor_];
  *rid = tuple->GetRid();
  cursor_++;
  return true;
}

}  // namespace bustub
